Producer / consumer with Goroutines
Producer consumer is design pattern where multiple consumers receive data for processing from a producer. Today we will implement this pattern using Goroutines. First we will implement a simple version where data is just processed and everything exits. Later we will upgrade it to handle errors if any occur.
Version 1
-
First implementation will consist of:
- producer
- n consumers
- wait group
- single channel
Producer
The producer will take a channel to send data to consumers and the data itself. Iterating over slice of integers it sends values one by one through the channel and once it's done it closes the channel.
func producer(buff chan int, data []int) { defer close(buff) for _, i := range data { fmt.Printf("Producer sending %d to channel\n", i) buff <- i } fmt.Println("Producer exiting") }
Consumers
Each consumer will iterate forever and try to fetch data from the channel. If data is available and channel is still open it will process the data, otherwise it quits and calls Done on the sync.WaitGroup.
func consumer(idx int, buff chan int, wg *sync.WaitGroup) { defer wg.Done() for { i, ok := <- buff if ok { fmt.Printf("Consumer #%d: received %d\n", idx, i) time.Sleep(1 * time.Second) } else { fmt.Printf("Consumer #%d: no more values to process, exiting\n", idx) return } } }
Main
We define sync.WaitGroup to synchronize goroutines and quit main function after all consumers are done. First implementation will use only single channel for sending data from producer to consumers. We fire producer in it's own goroutine and n consumers (in this case 3) each incrementing sync.WaitGroup. Then the main function calls Wait and waits for everything to complete.
package main import ( "fmt" "time" "sync" ) func main () { var wg sync.WaitGroup const workersCount = 3 var buffer = make(chan int, workersCount) data := []int{1, 2, 3, 4, 5, 6} fmt.Println("Starting producer") go producer(buffer, data) for i := 0; i < workersCount; i++ { wg.Add(1) fmt.Printf("Starting consumer #%d\n", i) go consumer(i, buffer, &wg) } fmt.Println("Main waiting") wg.Wait() fmt.Println("Main exiting") }
Output
Producer is started first followed by 3 consumers then main function starts waiting. Producers sends values to the channel and exits. Each consumer takes one value, processes it until all there is no data left. All consumers have called Done at this point there for main function stops waiting and exits on its own.
Starting producer Starting consumer #0 Starting consumer #1 Starting consumer #2 Main waiting Producer sending 1 to channel Producer sending 2 to channel Producer sending 3 to channel Producer sending 4 to channel Producer sending 5 to channel Producer sending 6 to channel Producer exiting Consumer #1: received 3 Consumer #0: received 2 Consumer #2: received 1 Consumer #2: received 4 Consumer #1: received 5 Consumer #0: received 6 Consumer #0: no more values to process, exiting Consumer #2: no more values to process, exiting Consumer #1: no more values to process, exiting Main exiting
Version 2
In second implementation we want to add error handling. Consumers and producer will check if any has been raised before processing data. If error was raised, they will quit.
-
Second implementation will consist of:
- producer
- n consumers
- wait group
- two channels
- context
Producer
Upgraded producer now takes context as an argument. Before sending data to a channel, it checks if cancel for the context has been called. As we will see in upgraded consumer, the cancel function we be called if and error during processing values has occurred. If there is no error, it sends values as before.
func producer(buff chan int, ctx context.Context, data []int) { defer close(buff) for _, i := range data { select { case <-ctx.Done(): fmt.Printf("Producer: error occurred, exiting\n") return default: } fmt.Printf("Producer sending %d to channel\n", i) buff <- i } fmt.Println("Producer exiting") }
Consumers
The biggest change happens here, in consumers. Function parameters now include new channel, context and cancel function. New channel is used to send errors that are raised inside data processing function. Context is used to check if any consumer has errored and called the cancel function. Cancel function is called in case of an error. For demonstration the error will be raised if value is equal 3.
func consumer(idx int, buff chan int, errs chan error, wg *sync.WaitGroup, ctx context.Context, cancel func()) { defer wg.Done() procedure := func(value int) error { if value == 3 { msg := fmt.Sprintf("Consumer #%d: Can't count to 3", idx) return errors.New(msg) } fmt.Printf("Consumer #%d: processing %d\n", idx, value) return nil } for { select { case <-ctx.Done(): fmt.Printf("Consumer #%d: somewhere error occurred, exiting\n", idx) return default: } i, ok := <- buff if ok { fmt.Printf("Consumer #%d: received %d\n", idx, i) err := procedure(i) if err != nil { fmt.Printf("Consumer #%d: procedure returned an error, exiting\n", idx) errs <- err cancel() return } time.Sleep(1 * time.Second) } else { fmt.Printf("Consumer #%d: no more values to process, exiting\n", idx) return } } }
Main
Important points are calling deferred cancel function and receiving errors from consumers if any were raised. Besides that there is no change to the main function.
package main import ( "context" "errors" "fmt" "time" "sync" ) func main () { var wg sync.WaitGroup const workersCount = 3 var buffer = make(chan int, workersCount) var errs = make(chan error, workersCount) ctx, cancel := context.WithCancel(context.Background()) defer cancel() data := []int{1, 2, 3, 4, 5, 6} fmt.Println("Starting producer") go producer(buffer, ctx, data) for i := 0; i < workersCount; i++ { wg.Add(1) fmt.Printf("Starting consumer #%d\n", i) go consumer(i, buffer, errs, &wg, ctx, cancel) } fmt.Println("Main waiting") wg.Wait() close(errs) for { e, ok := <- errs if ok { fmt.Println("Error received", e) } else { fmt.Println("No errors left") return } } fmt.Println("Main exiting") }
Output
Output looks similar until the value 3 is received by consumer #0. In that case the procedure raises an error which is sent through errors channel and cancel function is called. Other consumers will check context for done signal and exit. Producers does that too but in our example it manages to send all data and exit before the error is raised. Lastly main function receives error and prints it out.
Starting producer Starting consumer #0 Starting consumer #1 Starting consumer #2 Main waiting Producer sending 1 to channel Producer sending 2 to channel Producer sending 3 to channel Producer sending 4 to channel Producer sending 5 to channel Consumer #2: received 1 Consumer #2: processing 1 Consumer #1: received 2 Consumer #1: processing 2 Producer sending 6 to channel Consumer #0: received 3 Consumer #0: procedure returned an error, exiting Producer exiting Consumer #1: somewhere error occurred, exiting Consumer #2: somewhere error occurred, exiting Error received Consumer #0: Can't count to 3 No errors left
Conclusion
Go concurrency tools were the most fun to work with so far. Channels, context and wait groups make it fairly easy to control multithreaded programs and adapt the code to your needs. We implemented a simple version of producer / consumer pattern using only single channel and wait group. The context allowed us to upgrade it so we can handle errors and exit whole program if any error occurs.